32f26642885fe9975103dc95321bb83c36273934,src/main/java/com/continuuity/data/operation/ttqueue/TTQueueNewOnVCTable.java,FifoDequeueStrategy,fetchNextEntries,#QueueConsumer#QueueConfig#QueueStateImpl#ReadPointer#,790

Before Change


        // TODO: use raw Get instead of the workaround of incrementing zero
        // TODO: move counters into oracle
        long groupReadPointetr = table.incrementAtomicDirtily(makeRowKey(GROUP_READ_POINTER, consumer.getGroupId()), GROUP_READ_POINTER, 0);
        if(groupReadPointetr + config.getBatchSize() >= queueState.getQueueWritePointer()) {
          // Reached the end of queue as per cached QueueWritePointer,
          // read it again to see if there is any progress made by producers
          // TODO: use raw Get instead of the workaround of incrementing zero

After Change


    }

    @Override
    public List<Long> fetchNextEntries(QueueConsumer consumer, QueueConfig config, QueueStateImpl queueState, ReadPointer readPointer) throws OperationException {
      List<Long> newEntryIds = new ArrayList<Long>();

      // If claimed entries exist, return them
      long claimedEntryIdBegin = queueState.getClaimedEntryBegin();
      long claimedEntryIdEnd = queueState.getClaimedEntryEnd();
      if(claimedEntryIdBegin != INVALID_ENTRY_ID && claimedEntryIdEnd != INVALID_ENTRY_ID &&
        claimedEntryIdEnd >= claimedEntryIdBegin) {
        for(long i = claimedEntryIdBegin; i <= claimedEntryIdEnd; ++i) {
          newEntryIds.add(i);
        }
        return newEntryIds;
      }

      final long batchSize = getBatchSize(config);

      // Else claim new queue entries to process
      QueuePartitioner partitioner=config.getPartitionerType().getPartitioner();